package com.ivyft.jetty.yarn;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * <pre>
 *
 * Created by IntelliJ IDEA.
 * User: zhenqin
 * Date: 15/12/16
 * Time: 16:20
 * To change this template use File | Settings | File Templates.
 *
 * </pre>
 *
 * @author zhenqin
 */
public class ZookeeperJettyPublisher implements JettyPublisher, Closeable {


    /**
     * ZKClient
     */
    protected ZkClient zkClient;


    /**
     * 数据写入到 ZooKeeper 的 root path
     */
    protected String rootPath;


    /**
     * Task Conf
     */
    protected JettyConfiguration conf;


    /**
     * LOG
     */
    private static Logger LOG = LoggerFactory.getLogger(ZookeeperJettyPublisher.class);


    public ZookeeperJettyPublisher() {
    }

    @Override
    public void init(JettyConfiguration jettyConf) {
        this.conf = jettyConf;
        rootPath = jettyConf.getProperty("jetty.yarn.zookeeper.root", "/jetty");
        LOG.info("register to zookeeper path " + rootPath);

        String zookeeperServer = jettyConf.getString("jetty.yarn.publisher.zookeeper.server");
        LOG.info("connect to " + zookeeperServer);
        zkClient = new ZkClient(zookeeperServer,
                jettyConf.getInt("jetty.yarn.publisher.zookeeper.sessionTimeout", 6000),
                jettyConf.getInt("jetty.yarn.publisher.zookeeper.connectionTimeout", 60000));

        if(!zkClient.exists(rootPath)) {
            zkClient.createPersistent(rootPath);
        }
    }

    @Override
    public void publish(String host, int port) {
        String path = rootPath + "/" + host + ":" + port;
        LOG.info("jetty server published zookeeper at: " + path);

        String str = "";
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            JettyConfiguration myConf = new JettyConfiguration();
            myConf.copy(this.conf);
            myConf.setProperty("zookeeper.path", path);

            boolean copyEnv =  myConf.getBoolean("jetty.yarn.publisher.zookeeper.copyEnv", false);
            boolean copySysProp =  myConf.getBoolean("jetty.yarn.publisher.zookeeper.copySysProp", false);
            if(copyEnv) {
                Map<String, String> getenv = System.getenv();
                for (Map.Entry<String, String> entry : getenv.entrySet()) {
                    myConf.setProperty(entry.getKey(), entry.getValue());
                }
            }

            if(copySysProp) {
                Properties properties = System.getProperties();
                for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                    myConf.setProperty(entry.getKey().toString(), entry.getValue());
                }
            }

            myConf.save(out);
            out.flush();
            out.close();
            str = new String(out.toByteArray(), "UTF-8");
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("序列化配置信息异常。", e);
        }

        zkClient.createEphemeral(path, str);
    }

    @Override
    public void close() throws IOException {
        if(zkClient != null) {
            zkClient.close();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        SerializableSerializer serializer = new SerializableSerializer();
        byte[] heloos = serializer.serialize("heloo");

        System.out.println(heloos.length);
        System.out.println(serializer.deserialize(heloos));

        final ZkClient zkClient = new ZkClient("192.168.1.101:2181",6000, 60000);
        List<String> children = zkClient.getChildren("/jetty");
        for (String child : children) {
            System.out.println("----------->> " + child);
            Object data = zkClient.readData("/jetty/" + child);
            System.out.println(data);
        }

        zkClient.subscribeChildChanges("/jetty", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parent path: " + parentPath);
                System.out.println("==========================");

                for (String child : currentChilds) {
                    System.out.println("-----------------------------");
                    System.out.println("child: " + child);
                    Object data = zkClient.readData("/jetty/" + child);
                    System.out.println(data);
                    System.out.println("-----------------------------");
                }
                System.out.println("==========================");
            }
        });

        while (true) {
            Thread.sleep(3000);
        }

    }
}
